package org.apache.flink.cdc.connectors.gaussdb.source;

import java.util.Properties;

public class GaussDBSqlSource {

    public static <T> Builder<T> builder() {
        return new Builder<>();
    }

    public static class Builder<T> {

        private String pluginName = "mppdb_decoding";
        private String slotName = "replication_slot";
        private int port = 8001; // ha port 8001
        private String hostname;
        private String database;
        private String username;
        private String password;
        private String schemaList;
        private String tableList;
        private String replicationMode;
        private String url;
        private String sending_batch;
        private String parallel_decode_num;
        private String decode_style;

        public Builder<T> getUrl() {
            this.url = "jdbc:gaussdb://" + hostname + ":" + port + "/" + database + "";
            return this;
        }

        public Builder<T> decode_style(String decode_style) {
            this.decode_style = decode_style;
            return this;
        }

        public Builder<T> parallel_decode_num(String parallel_decode_num) {
            this.parallel_decode_num = parallel_decode_num;
            return this;
        }

        public Builder<T> sending_batch(String sending_batch) {
            this.sending_batch = sending_batch;
            return this;
        }

        public Builder<T> replicationMode(String replicationMode) {
            this.replicationMode = replicationMode;
            return this;
        }

        public Builder<T> decodingPluginName(String pluginName) {
            this.pluginName = pluginName;
            return this;
        }

        public Builder<T> hostname(String hostname) {
            this.hostname = hostname;
            return this;
        }

        /**
         * Integer port number of the PostgreSQL database server.
         */
        public Builder<T> port(int port) {
            this.port = port;
            return this;
        }

        /**
         * The name of the PostgreSQL database from which to stream the changes.
         */
        public Builder<T> database(String database) {
            this.database = database;
            return this;
        }


        public Builder<T> schemaList(String schemaList) {
            this.schemaList = schemaList;
            return this;
        }


        public Builder<T> tableList(String tableList) {
            this.tableList = tableList;
            return this;
        }

        public Builder<T> username(String username) {
            this.username = username;
            return this;
        }

        /**
         * Password to use when connecting to the PostgreSQL database server.
         */
        public Builder<T> password(String password) {
            this.password = password;
            return this;
        }

        public Builder<T> slotName(String slotName) {
            this.slotName = slotName;
            return this;
        }


        public GaussDBSourceFunction<T> build() {
            Properties properties = new Properties();
            properties.setProperty("user", username);     // 用户
            properties.setProperty("password", password); // 密码
            properties.setProperty("white-table-list", tableList);  // 单表：public.tablename 全表：public.* 多表：public.tableA,public.tableB
            properties.setProperty("replication_slot", slotName.toLowerCase()); // 复制槽
            properties.setProperty("replication_Mode", replicationMode); // 逻辑复制槽模式： 1 创建逻辑复制槽 ，2 使用逻辑复制槽 ，3 删除逻辑复制槽

            if (decode_style != null) {
                properties.setProperty("decode-style", decode_style);   // 解码格式 (j 是json ,t 是text)
            }
            if (sending_batch != null) {
                properties.setProperty("sending-batch", sending_batch);   // 批量发送解码结果
            }
            if (parallel_decode_num != null) {
                properties.setProperty("parallel-decode-num", parallel_decode_num); // 解码线程并行度
            }
            return new GaussDBSourceFunction<T>(getUrl().url, properties);
        }
    }
}